/**
 * Copyright (C) 2010-2013 Alibaba Group Holding Limited
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.alibaba.rocketmq.broker.client;

import com.alibaba.rocketmq.common.constant.LoggerName;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumeType;
import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
import io.netty.channel.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 整个Consumer Group信息
 * 
 * @author shijia.wxr<vintage.wang@gmail.com>
 * @since 2013-7-26
 */
public class ConsumerGroupInfo {
	private static final Logger log = LoggerFactory.getLogger(LoggerName.BrokerLoggerName);
	private final String groupName;
	private final ConcurrentHashMap<String/* Topic */, SubscriptionData> subscriptionTable = new ConcurrentHashMap<String, SubscriptionData>();
	private final ConcurrentHashMap<Channel, ClientChannelInfo> channelInfoTable = new ConcurrentHashMap<Channel, ClientChannelInfo>(16);
	private volatile ConsumeType consumeType;
	private volatile MessageModel messageModel;
	private volatile ConsumeFromWhere consumeFromWhere;
	private volatile long lastUpdateTimestamp = System.currentTimeMillis();

	public ConsumerGroupInfo(String groupName, ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere) {
		this.groupName = groupName;
		this.consumeType = consumeType;
		this.messageModel = messageModel;
		this.consumeFromWhere = consumeFromWhere;
	}

	public ClientChannelInfo findChannel(final String clientId) {
		Iterator<Entry<Channel, ClientChannelInfo>> it = this.channelInfoTable.entrySet().iterator();
		while (it.hasNext()) {
			Entry<Channel, ClientChannelInfo> next = it.next();
			if (next.getValue().getClientId().equals(clientId)) {
				return next.getValue();
			}
		}

		return null;
	}

	public ConcurrentHashMap<String, SubscriptionData> getSubscriptionTable() {
		return subscriptionTable;
	}

	public ConcurrentHashMap<Channel, ClientChannelInfo> getChannelInfoTable() {
		return channelInfoTable;
	}

	public List<Channel> getAllChannel() {
		List<Channel> result = new ArrayList<Channel>();

		result.addAll(this.channelInfoTable.keySet());

		return result;
	}

	public List<String> getAllClientId() {
		List<String> result = new ArrayList<String>();

		Iterator<Entry<Channel, ClientChannelInfo>> it = this.channelInfoTable.entrySet().iterator();

		while (it.hasNext()) {
			Entry<Channel, ClientChannelInfo> entry = it.next();
			ClientChannelInfo clientChannelInfo = entry.getValue();
			result.add(clientChannelInfo.getClientId());
		}

		return result;
	}

	public void unregisterChannel(final ClientChannelInfo clientChannelInfo) {
		ClientChannelInfo old = this.channelInfoTable.remove(clientChannelInfo.getChannel());
		if (old != null) {
			log.info("unregister a consumer[{}] from consumerGroupInfo {}", this.groupName, old.toString());
		}
	}

	public boolean doChannelCloseEvent(final String remoteAddr, final Channel channel) {
		final ClientChannelInfo info = this.channelInfoTable.remove(channel);
		if (info != null) {
			log.warn("NETTY EVENT: remove not active channel[{}] from ConsumerGroupInfo groupChannelTable, consumer group: {}", info.toString(), groupName);
			return true;
		}

		return false;
	}

	/**
	 * 返回值表示是否发生变更
	 */
	public boolean updateChannel(final ClientChannelInfo infoNew, ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere) {
		boolean updated = false;
		this.consumeType = consumeType;
		this.messageModel = messageModel;
		this.consumeFromWhere = consumeFromWhere;

		// 以该链接的Channel从ConsumerGroupInfo.channelInfoTable:ConcurrentHashMap<Channel, ClientChannelInfo>变量中获取ClientChannelInfo对象
		ClientChannelInfo infoOld = this.channelInfoTable.get(infoNew.getChannel());
		if (null == infoOld) {
			// 若该对象为空，则将请求参数中的ClientChannelInfo对象存入该Map变量中，并且认为Channel被更新过故置update=true
			ClientChannelInfo prev = this.channelInfoTable.put(infoNew.getChannel(), infoNew);
			if (null == prev) {
				log.info("new consumer connected, group: {} {} {} channel: {}", this.groupName, consumeType, messageModel, infoNew.toString());
				updated = true;
			}

			infoOld = infoNew;
		} else {
			// 若该对象不为空
			// 则检查已有的ClientChannelInfo对象的ClientId值是否与新传入的ClientChannelInfo对象的ClientId值一致
			if (!infoOld.getClientId().equals(infoNew.getClientId())) {
				log.error("[BUG] consumer channel exist in broker, but clientId not equal. GROUP: {} OLD: {} NEW: {} ", this.groupName, //
						infoOld.toString(), //
						infoNew.toString());
				// 若不一致，则替换该渠道信息
				this.channelInfoTable.put(infoNew.getChannel(), infoNew);
			}
		}

		this.lastUpdateTimestamp = System.currentTimeMillis();
		// 更新ClientChannelInfo的时间戳；
		infoOld.setLastUpdateTimestamp(this.lastUpdateTimestamp);

		return updated;
	}

	/**
	 * 返回值表示是否发生变更
	 */
	public boolean updateSubscription(final Set<SubscriptionData> subList) {
		boolean updated = false;
		// 增加新的订阅关系
		for (SubscriptionData sub : subList) {
			// 以SubscriptionData对象的topic值从ConsumerGroupInfo.subscriptionTable变量中获取已有的SubscriptionData对象
			SubscriptionData old = this.subscriptionTable.get(sub.getTopic());
			if (old == null) {
				// 若获取的SubscriptionData对象为null，则以topic值为key值将遍历到的该SubscriptionData对象存入subscriptionTable变量中
				SubscriptionData prev = this.subscriptionTable.put(sub.getTopic(), sub);
				if (null == prev) {
					// 认为订阅被更新过故置update=true
					updated = true;
					log.info("subscription changed, add new topic, group: {} {}", this.groupName, sub.toString());
				}
			} else if (sub.getSubVersion() > old.getSubVersion()) {
				// 否则若已有的SubscriptionData对象的SubVersion标记小于新的SubscriptionData对象的SubVersion标记
				if (this.consumeType == ConsumeType.CONSUME_PASSIVELY) {
					log.info("subscription changed, group: {} OLD: {} NEW: {}", //
							this.groupName, //
							old.toString(), //
							sub.toString()//
					);
				}
				// 更新subscriptionTable变量中已有的SubscriptionData对象
				this.subscriptionTable.put(sub.getTopic(), sub);
			}
		}

		// 删除老的订阅关系
		Iterator<Entry<String, SubscriptionData>> it = this.subscriptionTable.entrySet().iterator();
		while (it.hasNext()) {
			Entry<String, SubscriptionData> next = it.next();
			String oldTopic = next.getKey();

			boolean exist = false;
			for (SubscriptionData sub : subList) {
				// 检查ConsumerGroupInfo.subscriptionTable变量中每个topic
				if (sub.getTopic().equals(oldTopic)) {
					exist = true;
					break;
				}
			}

			if (!exist) {
				log.warn("subscription changed, group: {} remove topic {} {}", //
						this.groupName, //
						oldTopic, //
						next.getValue().toString()//
				);
				/**
				 * 若topic不等于请求参数SubscriptionData集合的每个SubscriptionData对象的topic变量值；
				 * 则从subscriptionTable集合中将该topic的记录删除掉，并且认为订阅被更新过故置update=true；
				 */
				it.remove();
				updated = true;
			}
		}
		// 更新ConsumerGroupInfo对象中的时间戳
		this.lastUpdateTimestamp = System.currentTimeMillis();

		return updated;
	}

	public Set<String> getSubscribeTopics() {
		return subscriptionTable.keySet();
	}

	public SubscriptionData findSubscriptionData(final String topic) {
		return this.subscriptionTable.get(topic);
	}

	public ConsumeType getConsumeType() {
		return consumeType;
	}

	public void setConsumeType(ConsumeType consumeType) {
		this.consumeType = consumeType;
	}

	public MessageModel getMessageModel() {
		return messageModel;
	}

	public void setMessageModel(MessageModel messageModel) {
		this.messageModel = messageModel;
	}

	public String getGroupName() {
		return groupName;
	}

	public long getLastUpdateTimestamp() {
		return lastUpdateTimestamp;
	}

	public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
		this.lastUpdateTimestamp = lastUpdateTimestamp;
	}

	public ConsumeFromWhere getConsumeFromWhere() {
		return consumeFromWhere;
	}

	public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) {
		this.consumeFromWhere = consumeFromWhere;
	}
}
